package com.cscec8b.ct.consumer.bean;

import com.cscec8b.ct.common.bean.Consumer;
import com.cscec8b.ct.consumer.dao.HBaseDao;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;

/**
 * @Copyright: Shanghai Definesys Company.All rights reserved.
 * @Description:
 * @author: chuhaitao
 * @since: 2019/1/27 19:10
 * @history: 1.2019/1/27 created by chuhaitao
 */
public class CalllogConsumer implements Consumer {


    public void consumer() throws Exception {
        //kafka的客户端
        Properties prop = new Properties();

        try {
            //Thread.currentThread().getContextClassLoader()
            //当前线程的上下文类加载器 等价于
            //CalllogConsumer.class.getContextClassLoader()
            //获取类路径下的配置文件的信息
            prop.load(Thread.currentThread().getContextClassLoader()
                    .getResourceAsStream("kafkaConsumer.properties"));
            KafkaConsumer<String, String> consumer = new KafkaConsumer(prop);
            //订阅主题
            consumer.subscribe(Arrays.asList("ct"));
            //
            HBaseDao hBaseDao = new HBaseDao();
            //初始化，需要创建命名空间，建表
            hBaseDao.init();

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(3000);

                for (ConsumerRecord record : records) {
                    System.out.println("----" + record.value());
                    //插入数据，
                   //    hBaseDao.insertData(record.value().toString());
                    CallLog log=new CallLog(record.value().toString());
                    //使用对象去添加
                    log.setFlag("1");


                    hBaseDao.insertData(log);
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }


    }

    public void close() throws IOException {

    }
}
